package com.paic.common.middleware.util

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 从HBase中读取数据到Spark中
 */
object HbaseDataToSparkRDD {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf();
    val sparkContext = new SparkContext(sparkConf);
    val scan = new Scan()
    scan.setFilter(new SingleColumnValueFilter("columnFamily".getBytes, "qualify".getBytes, CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(18)))
    val conf = getHbaseConfiguration("tableName", scan)
    val resultRDD = sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])
    //RDD operation
    resultRDD.count()
    resultRDD.cache()
    resultRDD.foreach {
      case (_, result) =>
        val key = Bytes.toInt(result.getRow)
        val name = Bytes.toString(result.getValue("basic".getBytes, "name".getBytes))
        val age = Bytes.toInt(result.getValue("basic".getBytes, "age".getBytes))
        println("row key:" + key + " name" + name + "Age:" + age)
    }

  }

  def getHbaseConfiguration(tableName: String, scan: Scan): Configuration = {
    val conf = HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE, tableName)
    conf.set(TableInputFormat.SCAN, scanToString(scan))
    conf
  }

  def scanToString(scan: Scan) = {
    val proto = ProtobufUtil.toScan(scan)
    Base64.encodeBytes(proto.toByteArray)
  }
}
